1use std::collections::HashMap;
8use std::path::Path;
9use std::sync::{Arc, Mutex};
10use std::time::{SystemTime, UNIX_EPOCH};
11
12use rusqlite::{params, Connection};
13
14use super::cache::CacheEntry;
15use super::types::{FileMeta, FileOpts, WaveFile};
16use crate::backend::storage::error::StoreError;
17use crate::backend::storage::migrations::{
18 check_schema_compat, run_filestore_migrations, stamp_version, FILESTORE_SCHEMA_VERSION,
19};
20
21pub(super) const PART_DATA_SIZE: usize = 64 * 1024;
23
24#[allow(dead_code)]
26pub const DEFAULT_FLUSH_SECS: u64 = 5;
27
28#[allow(dead_code)]
30pub const CACHE_TTL_SECS: u64 = 60;
31
32pub const MAX_CACHE_BYTES: usize = 128 * 1024 * 1024;
35
36pub struct FileStore {
38 pub(super) conn: Mutex<Connection>,
39 pub(super) cache: Mutex<HashMap<(String, String), CacheEntry>>,
40 pub(super) cache_total_bytes: Mutex<usize>,
42 pub(super) cache_max_bytes: usize,
44}
45
46impl FileStore {
47 pub fn open(path: &Path) -> Result<Self, StoreError> {
49 let conn = Connection::open(path)?;
50 Self::configure_and_migrate(conn)
51 }
52
53 #[allow(dead_code)]
55 pub fn open_in_memory() -> Result<Self, StoreError> {
56 let conn = Connection::open_in_memory()?;
57 Self::configure_and_migrate(conn)
58 }
59
60 #[allow(dead_code)]
62 pub fn open_in_memory_with_cap(max_bytes: usize) -> Result<Self, StoreError> {
63 let conn = Connection::open_in_memory()?;
64 let mut store = Self::configure_and_migrate(conn)?;
65 store.cache_max_bytes = max_bytes;
66 Ok(store)
67 }
68
69 fn configure_and_migrate(conn: Connection) -> Result<Self, StoreError> {
70 conn.execute_batch(
71 "PRAGMA journal_mode=WAL;
72 PRAGMA busy_timeout=5000;",
73 )?;
74 check_schema_compat(&conn, FILESTORE_SCHEMA_VERSION, "filestore.db")?;
78 run_filestore_migrations(&conn)?;
79 stamp_version(&conn, FILESTORE_SCHEMA_VERSION)?;
80 Ok(Self {
81 conn: Mutex::new(conn),
82 cache: Mutex::new(HashMap::new()),
83 cache_total_bytes: Mutex::new(0),
84 cache_max_bytes: MAX_CACHE_BYTES,
85 })
86 }
87
88 pub(super) fn now_ms() -> i64 {
89 SystemTime::now()
90 .duration_since(UNIX_EPOCH)
91 .unwrap_or_default()
92 .as_millis() as i64
93 }
94
95 pub(super) fn evict_to_cap(&self) {
98 let total = *self.cache_total_bytes.lock().unwrap();
100 if total <= self.cache_max_bytes {
101 return;
102 }
103
104 let candidates: Vec<(i64, (String, String), usize)> = {
106 let cache = self.cache.lock().unwrap();
107 cache
108 .iter()
109 .map(|(k, e)| (e.last_access_ms, k.clone(), e.cached_size_bytes))
110 .collect()
111 };
112
113 let mut candidates = candidates;
115 candidates.sort_by_key(|(ts, _, _)| *ts);
116
117 let mut evicted_count = 0usize;
118 let mut evicted_bytes = 0usize;
119
120 for (_, key, size) in candidates {
121 {
122 let total = *self.cache_total_bytes.lock().unwrap();
123 if total <= self.cache_max_bytes {
124 break;
125 }
126 }
127 {
128 let mut cache = self.cache.lock().unwrap();
129 if cache.remove(&key).is_some() {
130 let mut total = self.cache_total_bytes.lock().unwrap();
131 *total = total.saturating_sub(size);
132 evicted_count += 1;
133 evicted_bytes += size;
134 }
135 }
136 }
137
138 if evicted_count > 0 {
139 tracing::debug!(
140 "filestore lru: evicted {} entries, freed {} bytes (cap={})",
141 evicted_count,
142 evicted_bytes,
143 self.cache_max_bytes,
144 );
145 }
146 }
147
148 #[allow(dead_code)]
150 pub fn make_file(
151 &self,
152 zone_id: &str,
153 name: &str,
154 meta: FileMeta,
155 opts: FileOpts,
156 ) -> Result<(), StoreError> {
157 let now = Self::now_ms();
158 let file = WaveFile {
159 zoneid: zone_id.to_string(),
160 name: name.to_string(),
161 size: 0,
162 createdts: now,
163 modts: now,
164 opts,
165 meta,
166 };
167
168 let conn = self.conn.lock().unwrap();
169 let exists: bool = conn
170 .query_row(
171 "SELECT 1 FROM db_wave_file WHERE zoneid = ?1 AND name = ?2",
172 params![zone_id, name],
173 |_| Ok(true),
174 )
175 .unwrap_or(false);
176
177 if exists {
178 return Err(StoreError::AlreadyExists);
179 }
180
181 let opts_json = serde_json::to_string(&file.opts)?;
182 let meta_json = serde_json::to_string(&file.meta)?;
183 conn.execute(
184 "INSERT INTO db_wave_file (zoneid, name, size, createdts, modts, opts, meta) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
185 params![file.zoneid, file.name, file.size, file.createdts, file.modts, opts_json, meta_json],
186 )?;
187
188 let key = (zone_id.to_string(), name.to_string());
190 let entry = CacheEntry {
191 file: Some(file),
192 data_entries: HashMap::new(),
193 dirty: false,
194 last_access_ms: now,
195 cached_size_bytes: 64, };
197 {
198 let mut cache = self.cache.lock().unwrap();
199 cache.insert(key, entry);
200 *self.cache_total_bytes.lock().unwrap() += 64;
201 }
202 self.evict_to_cap();
203
204 Ok(())
205 }
206
207 #[allow(dead_code)]
209 pub fn delete_file(&self, zone_id: &str, name: &str) -> Result<(), StoreError> {
210 let conn = self.conn.lock().unwrap();
211 conn.execute(
212 "DELETE FROM db_wave_file WHERE zoneid = ?1 AND name = ?2",
213 params![zone_id, name],
214 )?;
215 conn.execute(
216 "DELETE FROM db_file_data WHERE zoneid = ?1 AND name = ?2",
217 params![zone_id, name],
218 )?;
219 drop(conn);
220
221 let key = (zone_id.to_string(), name.to_string());
223 let mut cache = self.cache.lock().unwrap();
224 if let Some(removed) = cache.remove(&key) {
225 let mut total = self.cache_total_bytes.lock().unwrap();
226 *total = total.saturating_sub(removed.cached_size_bytes);
227 }
228
229 Ok(())
230 }
231
232 #[allow(dead_code)]
234 pub fn delete_zone(&self, zone_id: &str) -> Result<(), StoreError> {
235 let names: Vec<String> = {
237 let conn = self.conn.lock().unwrap();
238 let mut stmt = conn.prepare("SELECT name FROM db_wave_file WHERE zoneid = ?1")?;
239 let rows = stmt.query_map(params![zone_id], |row| row.get(0))?;
240 rows.filter_map(|r| r.ok()).collect()
241 };
242
243 let conn = self.conn.lock().unwrap();
244 conn.execute(
245 "DELETE FROM db_wave_file WHERE zoneid = ?1",
246 params![zone_id],
247 )?;
248 conn.execute(
249 "DELETE FROM db_file_data WHERE zoneid = ?1",
250 params![zone_id],
251 )?;
252 drop(conn);
253
254 let mut cache = self.cache.lock().unwrap();
255 let mut freed = 0usize;
256 for name in names {
257 if let Some(removed) = cache.remove(&(zone_id.to_string(), name)) {
258 freed += removed.cached_size_bytes;
259 }
260 }
261 if freed > 0 {
262 let mut total = self.cache_total_bytes.lock().unwrap();
263 *total = total.saturating_sub(freed);
264 }
265
266 Ok(())
267 }
268
269 pub fn stat(&self, zone_id: &str, name: &str) -> Result<Option<WaveFile>, StoreError> {
271 let key = (zone_id.to_string(), name.to_string());
273 {
274 let mut cache = self.cache.lock().unwrap();
275 if let Some(entry) = cache.get_mut(&key) {
276 entry.last_access_ms = Self::now_ms();
277 return Ok(entry.file.clone());
278 }
279 }
280
281 let conn = self.conn.lock().unwrap();
283 let result = conn.query_row(
284 "SELECT zoneid, name, size, createdts, modts, opts, meta FROM db_wave_file WHERE zoneid = ?1 AND name = ?2",
285 params![zone_id, name],
286 |row| {
287 let opts_str: String = row.get(5)?;
288 let meta_str: String = row.get(6)?;
289 Ok(WaveFile {
290 zoneid: row.get(0)?,
291 name: row.get(1)?,
292 size: row.get(2)?,
293 createdts: row.get(3)?,
294 modts: row.get(4)?,
295 opts: serde_json::from_str(&opts_str).unwrap_or_default(),
296 meta: serde_json::from_str(&meta_str).unwrap_or_default(),
297 })
298 },
299 );
300
301 match result {
302 Ok(file) => Ok(Some(file)),
303 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
304 Err(e) => Err(StoreError::Sqlite(e)),
305 }
306 }
307
308 pub fn write_file(
310 &self,
311 zone_id: &str,
312 name: &str,
313 data: &[u8],
314 ) -> Result<(), StoreError> {
315 let key = (zone_id.to_string(), name.to_string());
316 let now = Self::now_ms();
317
318 let parts = Self::split_into_parts(data);
320
321 let conn = self.conn.lock().unwrap();
323
324 let exists: bool = conn
326 .query_row(
327 "SELECT 1 FROM db_wave_file WHERE zoneid = ?1 AND name = ?2",
328 params![zone_id, name],
329 |_| Ok(true),
330 )
331 .unwrap_or(false);
332 if !exists {
333 return Err(StoreError::NotFound);
334 }
335
336 conn.execute(
338 "UPDATE db_wave_file SET size = ?1, modts = ?2 WHERE zoneid = ?3 AND name = ?4",
339 params![data.len() as i64, now, zone_id, name],
340 )?;
341
342 conn.execute(
344 "DELETE FROM db_file_data WHERE zoneid = ?1 AND name = ?2",
345 params![zone_id, name],
346 )?;
347 for (idx, part_data) in parts.iter().enumerate() {
348 conn.execute(
349 "INSERT INTO db_file_data (zoneid, name, partidx, data) VALUES (?1, ?2, ?3, ?4)",
350 params![zone_id, name, idx as i32, part_data],
351 )?;
352 }
353 drop(conn);
354
355 {
357 let new_size = data.len().max(64);
358 let mut cache = self.cache.lock().unwrap();
359 if let Some(entry) = cache.get_mut(&key) {
360 let old_size = entry.cached_size_bytes;
361 if let Some(ref mut file) = entry.file {
362 file.size = data.len() as i64;
363 file.modts = now;
364 }
365 entry.last_access_ms = now;
366 entry.cached_size_bytes = new_size;
367 let delta = new_size as i64 - old_size as i64;
368 let mut total = self.cache_total_bytes.lock().unwrap();
369 if delta >= 0 {
370 *total += delta as usize;
371 } else {
372 *total = total.saturating_sub((-delta) as usize);
373 }
374 }
375 }
376 self.evict_to_cap();
377
378 Ok(())
379 }
380
381 pub fn read_file(&self, zone_id: &str, name: &str) -> Result<Option<Vec<u8>>, StoreError> {
383 let file = match self.stat(zone_id, name)? {
385 Some(f) => f,
386 None => return Ok(None),
387 };
388
389 if file.size == 0 {
390 return Ok(Some(Vec::new()));
391 }
392
393 let data_len = file.data_length();
394 let start_idx = file.data_start_idx();
395 let num_parts = ((start_idx + data_len - 1) / PART_DATA_SIZE as i64 + 1) as i32;
396 let start_part = (start_idx / PART_DATA_SIZE as i64) as i32;
397
398 let conn = self.conn.lock().unwrap();
400 let mut stmt = conn.prepare(
401 "SELECT partidx, data FROM db_file_data WHERE zoneid = ?1 AND name = ?2 ORDER BY partidx",
402 )?;
403 let rows = stmt.query_map(params![zone_id, name], |row| {
404 Ok((row.get::<_, i32>(0)?, row.get::<_, Vec<u8>>(1)?))
405 })?;
406
407 let mut parts_map: HashMap<i32, Vec<u8>> = HashMap::new();
408 for row in rows {
409 let (idx, data) = row?;
410 parts_map.insert(idx, data);
411 }
412 drop(stmt);
413 drop(conn);
414
415 let mut result = Vec::with_capacity(data_len as usize);
417 for part_idx in start_part..start_part + num_parts {
418 if let Some(part_data) = parts_map.get(&part_idx) {
419 let part_start = part_idx as i64 * PART_DATA_SIZE as i64;
420 let skip = if part_start < start_idx {
421 (start_idx - part_start) as usize
422 } else {
423 0
424 };
425 let remaining = data_len as usize - result.len();
426 let take = remaining.min(part_data.len() - skip);
427 result.extend_from_slice(&part_data[skip..skip + take]);
428 }
429 }
430
431 let _ = (num_parts, start_part); Ok(Some(result))
433 }
434
435 pub fn append_data(
437 &self,
438 zone_id: &str,
439 name: &str,
440 data: &[u8],
441 ) -> Result<(), StoreError> {
442 if data.is_empty() {
443 return Ok(());
444 }
445
446 let key = (zone_id.to_string(), name.to_string());
447 let now = Self::now_ms();
448
449 let file = self.stat(zone_id, name)?.ok_or(StoreError::NotFound)?;
450 let new_size = file.size + data.len() as i64;
451
452 let start_offset = file.size;
454 let start_part = (start_offset / PART_DATA_SIZE as i64) as i32;
455 let offset_in_part = (start_offset % PART_DATA_SIZE as i64) as usize;
456
457 let conn = self.conn.lock().unwrap();
459 let mut data_offset = 0usize;
460 let mut current_part = start_part;
461
462 if offset_in_part > 0 {
463 let existing: Option<Vec<u8>> = conn
465 .query_row(
466 "SELECT data FROM db_file_data WHERE zoneid = ?1 AND name = ?2 AND partidx = ?3",
467 params![zone_id, name, start_part],
468 |row| row.get(0),
469 )
470 .ok();
471
472 let mut part_data = existing.unwrap_or_default();
473 let space = PART_DATA_SIZE - part_data.len();
474 let to_copy = space.min(data.len());
475 part_data.extend_from_slice(&data[..to_copy]);
476 data_offset = to_copy;
477
478 conn.execute(
479 "REPLACE INTO db_file_data (zoneid, name, partidx, data) VALUES (?1, ?2, ?3, ?4)",
480 params![zone_id, name, current_part, part_data],
481 )?;
482 current_part += 1;
483 }
484
485 while data_offset < data.len() {
487 let end = (data_offset + PART_DATA_SIZE).min(data.len());
488 let part_data = &data[data_offset..end];
489 conn.execute(
490 "REPLACE INTO db_file_data (zoneid, name, partidx, data) VALUES (?1, ?2, ?3, ?4)",
491 params![zone_id, name, current_part, part_data],
492 )?;
493 data_offset = end;
494 current_part += 1;
495 }
496
497 conn.execute(
499 "UPDATE db_wave_file SET size = ?1, modts = ?2 WHERE zoneid = ?3 AND name = ?4",
500 params![new_size, now, zone_id, name],
501 )?;
502 drop(conn);
503
504 {
506 let new_size_bytes = (new_size as usize).max(64);
507 let mut cache = self.cache.lock().unwrap();
508 if let Some(entry) = cache.get_mut(&key) {
509 let old_size = entry.cached_size_bytes;
510 if let Some(ref mut f) = entry.file {
511 f.size = new_size;
512 f.modts = now;
513 }
514 entry.last_access_ms = now;
515 entry.cached_size_bytes = new_size_bytes;
516 let delta = new_size_bytes as i64 - old_size as i64;
517 let mut total = self.cache_total_bytes.lock().unwrap();
518 if delta >= 0 {
519 *total += delta as usize;
520 } else {
521 *total = total.saturating_sub((-delta) as usize);
522 }
523 }
524 }
525 self.evict_to_cap();
526
527 Ok(())
528 }
529
530 pub fn write_meta(
533 &self,
534 zone_id: &str,
535 name: &str,
536 meta: FileMeta,
537 merge: bool,
538 ) -> Result<(), StoreError> {
539 let key = (zone_id.to_string(), name.to_string());
540 let now = Self::now_ms();
541
542 let file = self.stat(zone_id, name)?.ok_or(StoreError::NotFound)?;
543
544 let new_meta = if merge {
545 let mut merged = file.meta.clone();
546 for (k, v) in meta {
547 if v.is_null() {
548 merged.remove(&k);
549 } else {
550 merged.insert(k, v);
551 }
552 }
553 merged
554 } else {
555 meta
556 };
557
558 let meta_json = serde_json::to_string(&new_meta)?;
559 let conn = self.conn.lock().unwrap();
560 conn.execute(
561 "UPDATE db_wave_file SET meta = ?1, modts = ?2 WHERE zoneid = ?3 AND name = ?4",
562 params![meta_json, now, zone_id, name],
563 )?;
564 drop(conn);
565
566 let mut cache = self.cache.lock().unwrap();
568 if let Some(entry) = cache.get_mut(&key) {
569 if let Some(ref mut f) = entry.file {
570 f.meta = new_meta;
571 f.modts = now;
572 }
573 entry.last_access_ms = now;
574 }
575
576 Ok(())
577 }
578
579 #[allow(dead_code)]
581 pub fn list_files(&self, zone_id: &str) -> Result<Vec<WaveFile>, StoreError> {
582 let conn = self.conn.lock().unwrap();
583 let mut stmt = conn.prepare(
584 "SELECT zoneid, name, size, createdts, modts, opts, meta FROM db_wave_file WHERE zoneid = ?1",
585 )?;
586 let rows = stmt.query_map(params![zone_id], |row| {
587 let opts_str: String = row.get(5)?;
588 let meta_str: String = row.get(6)?;
589 Ok(WaveFile {
590 zoneid: row.get(0)?,
591 name: row.get(1)?,
592 size: row.get(2)?,
593 createdts: row.get(3)?,
594 modts: row.get(4)?,
595 opts: serde_json::from_str(&opts_str).unwrap_or_default(),
596 meta: serde_json::from_str(&meta_str).unwrap_or_default(),
597 })
598 })?;
599
600 rows.collect::<Result<Vec<_>, _>>()
601 .map_err(StoreError::Sqlite)
602 }
603
604 #[allow(dead_code)]
606 pub fn get_all_zone_ids(&self) -> Result<Vec<String>, StoreError> {
607 let conn = self.conn.lock().unwrap();
608 let mut stmt = conn.prepare("SELECT DISTINCT zoneid FROM db_wave_file")?;
609 let rows = stmt.query_map([], |row| row.get(0))?;
610 rows.collect::<Result<Vec<_>, _>>()
611 .map_err(StoreError::Sqlite)
612 }
613
614 #[allow(dead_code)]
617 pub fn flush_cache(&self) -> Result<(usize, usize), StoreError> {
618 let ttl_ms = (CACHE_TTL_SECS * 1000) as i64;
619 let now = Self::now_ms();
620 let cutoff_ms = now - ttl_ms;
621
622 let (dirty_keys, stale_keys): (Vec<_>, Vec<_>) = {
623 let cache = self.cache.lock().unwrap();
624 let dirty = cache
625 .iter()
626 .filter(|(_, e)| e.dirty)
627 .map(|(k, _)| k.clone())
628 .collect();
629 let stale = cache
630 .iter()
631 .filter(|(_, e)| !e.dirty && e.last_access_ms < cutoff_ms)
632 .map(|(k, _)| k.clone())
633 .collect();
634 (dirty, stale)
635 };
636
637 if !stale_keys.is_empty() {
639 let mut freed = 0usize;
640 let mut cache = self.cache.lock().unwrap();
641 for key in &stale_keys {
642 if let Some(removed) = cache.remove(key) {
643 freed += removed.cached_size_bytes;
644 }
645 }
646 if freed > 0 {
647 let mut total = self.cache_total_bytes.lock().unwrap();
648 *total = total.saturating_sub(freed);
649 }
650 tracing::debug!("filestore cache: evicted {} stale entries ({} bytes)", stale_keys.len(), freed);
651 }
652
653 let mut files_flushed = 0;
654 let mut parts_flushed = 0;
655
656 for key in dirty_keys {
657 let entry = {
658 let mut cache = self.cache.lock().unwrap();
659 let entry = cache.remove(&key);
660 if let Some(ref e) = entry {
661 let mut total = self.cache_total_bytes.lock().unwrap();
662 *total = total.saturating_sub(e.cached_size_bytes);
663 }
664 entry
665 };
666
667 if let Some(entry) = entry {
668 if let Some(ref file) = entry.file {
669 let conn = self.conn.lock().unwrap();
670 let meta_json = serde_json::to_string(&file.meta)?;
671 conn.execute(
672 "UPDATE db_wave_file SET size = ?1, modts = ?2, meta = ?3 WHERE zoneid = ?4 AND name = ?5",
673 params![file.size, file.modts, meta_json, file.zoneid, file.name],
674 )?;
675
676 for data_entry in entry.data_entries.values() {
677 conn.execute(
678 "REPLACE INTO db_file_data (zoneid, name, partidx, data) VALUES (?1, ?2, ?3, ?4)",
679 params![file.zoneid, file.name, data_entry.part_idx, data_entry.data],
680 )?;
681 parts_flushed += 1;
682 }
683 files_flushed += 1;
684 }
685 }
686 }
687
688 Ok((files_flushed, parts_flushed))
689 }
690
691 fn split_into_parts(data: &[u8]) -> Vec<Vec<u8>> {
693 if data.is_empty() {
694 return Vec::new();
695 }
696 data.chunks(PART_DATA_SIZE)
697 .map(|chunk| chunk.to_vec())
698 .collect()
699 }
700
701 #[allow(dead_code)]
703 pub fn start_flusher(self: &Arc<Self>) -> tokio::task::JoinHandle<()> {
704 let store = Arc::clone(self);
705 tokio::spawn(async move {
706 let mut interval =
707 tokio::time::interval(std::time::Duration::from_secs(DEFAULT_FLUSH_SECS));
708 loop {
709 interval.tick().await;
710 match store.flush_cache() {
711 Ok((files, parts)) => {
712 if files > 0 {
713 tracing::debug!(
714 "filestore flush: {} files, {} parts",
715 files,
716 parts
717 );
718 }
719 }
720 Err(e) => {
721 tracing::error!("filestore flush error: {}", e);
722 }
723 }
724 }
725 })
726 }
727}